{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Distributed training\n",
    "\n",
    "<div align=\"left\">\n",
    "<a target=\"_blank\" href=\"https://console.anyscale.com/\"><img src=\"https://raw.githubusercontent.com/ray-project/ray/c34b74c22a9390aa89baf80815ede59397786d2e/doc/source/_static/img/run-on-anyscale.svg\"></a>&nbsp;\n",
    "\n",
    "<a href=\"https://github.com/anyscale/foundational-ray-app\" role=\"button\"><img src=\"https://img.shields.io/static/v1?label=&amp;message=View%20On%20GitHub&amp;color=586069&amp;logo=github&amp;labelColor=2f363d\"></a>&nbsp;\n",
    "</div>\n",
    "\n",
    "This tutorial executes a distributed training workload that connects the following heterogeneous workloads:\n",
    "- preprocess the dataset prior to training\n",
    "- distributed training with Ray Train and PyTorch with observability\n",
    "- evaluation (batch inference and eval logic)\n",
    "- save model artifacts to a model registry (MLOps)\n",
    "\n",
    "**Note**: this tutorial doesn't tune the model but see [Ray Tune](https://docs.ray.io/en/latest/tune/index.html) for experiment execution and hyperparameter tuning at any scale.\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/distributed_training.png\" width=800>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[92mSuccessfully registered `matplotlib, torch` and 4 other packages to be installed on all cluster nodes.\u001b[0m\n",
      "\u001b[92mView and update dependencies here: https://console.anyscale.com/cld_kvedZWag2qA8i5BjxUevf5i7/prj_cz951f43jjdybtzkx1s5sjgz99/workspaces/expwrk_eys8cskj5aivghbf773dp2vmcd?workspace-tab=dependencies\u001b[0m\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "pip install -q \"matplotlib==3.10.0\" \"torch==2.7.0\" \"transformers==4.52.3\" \"scikit-learn==1.6.0\" \"mlflow==2.19.0\" \"ipywidgets==8.1.3\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%load_ext autoreload\n",
    "%autoreload all"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import ray\n",
    "import sys\n",
    "sys.path.append(os.path.abspath(\"..\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-05-29 17:27:36,501\tINFO worker.py:1660 -- Connecting to existing Ray cluster at address: 10.0.56.137:6379...\n",
      "2025-05-29 17:27:36,512\tINFO worker.py:1843 -- Connected to Ray cluster. View the dashboard at \u001b[1m\u001b[32mhttps://session-gcwehd9xxjzkv5lxv8lgcdgx2n.i.anyscaleuserdata.com \u001b[39m\u001b[22m\n",
      "2025-05-29 17:27:36,600\tINFO packaging.py:575 -- Creating a file package for local module '../'.\n",
      "2025-05-29 17:27:36,665\tWARNING packaging.py:417 -- File /home/ray/default/foundational-ray-app/notebooks/../.git/objects/pack/pack-b8b7f3cf34764341ace726e9197e18f11b5aaedc.pack is very large (15.84MiB). Consider adding this file to the 'excludes' list to skip uploading it: `ray.init(..., runtime_env={'excludes': ['/home/ray/default/foundational-ray-app/notebooks/../.git/objects/pack/pack-b8b7f3cf34764341ace726e9197e18f11b5aaedc.pack']})`\n",
      "2025-05-29 17:27:36,743\tINFO packaging.py:367 -- Pushing file package 'gcs://_ray_pkg_01dff90bc6ace53f.zip' (29.50MiB) to Ray cluster...\n",
      "2025-05-29 17:27:36,886\tINFO packaging.py:380 -- Successfully pushed file package 'gcs://_ray_pkg_01dff90bc6ace53f.zip'.\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "33d35af1c9a04d46991a09924de9c1e7",
       "version_major": 2,
       "version_minor": 0
      },
      "text/html": [
       "<div class=\"lm-Widget p-Widget lm-Panel p-Panel jp-Cell-outputWrapper\">\n",
       "    <div style=\"margin-left: 50px;display: flex;flex-direction: row;align-items: center\">\n",
       "        <div class=\"jp-RenderedHTMLCommon\" style=\"display: flex; flex-direction: row;\">\n",
       "  <svg viewBox=\"0 0 567 224\" fill=\"none\" xmlns=\"http://www.w3.org/2000/svg\" style=\"height: 3em;\">\n",
       "    <g clip-path=\"url(#clip0_4338_178347)\">\n",
       "        <path d=\"M341.29 165.561H355.29L330.13 129.051C345.63 123.991 354.21 112.051 354.21 94.2307C354.21 71.3707 338.72 58.1807 311.88 58.1807H271V165.561H283.27V131.661H311.8C314.25 131.661 316.71 131.501 319.01 131.351L341.25 165.561H341.29ZM283.29 119.851V70.0007H311.82C331.3 70.0007 342.34 78.2907 342.34 94.5507C342.34 111.271 331.34 119.861 311.82 119.861L283.29 119.851ZM451.4 138.411L463.4 165.561H476.74L428.74 58.1807H416L367.83 165.561H380.83L392.83 138.411H451.4ZM446.19 126.601H398L422 72.1407L446.24 126.601H446.19ZM526.11 128.741L566.91 58.1807H554.35L519.99 114.181L485.17 58.1807H472.44L514.01 129.181V165.541H526.13V128.741H526.11Z\" fill=\"var(--jp-ui-font-color0)\"/>\n",
       "        <path d=\"M82.35 104.44C84.0187 97.8827 87.8248 92.0678 93.1671 87.9146C98.5094 83.7614 105.083 81.5067 111.85 81.5067C118.617 81.5067 125.191 83.7614 130.533 87.9146C135.875 92.0678 139.681 97.8827 141.35 104.44H163.75C164.476 101.562 165.622 98.8057 167.15 96.2605L127.45 56.5605C121.071 60.3522 113.526 61.6823 106.235 60.3005C98.9443 58.9187 92.4094 54.9203 87.8602 49.0574C83.3109 43.1946 81.0609 35.8714 81.5332 28.4656C82.0056 21.0599 85.1679 14.0819 90.4252 8.8446C95.6824 3.60726 102.672 0.471508 110.08 0.0272655C117.487 -0.416977 124.802 1.86091 130.647 6.4324C136.493 11.0039 140.467 17.5539 141.821 24.8501C143.175 32.1463 141.816 39.6859 138 46.0505L177.69 85.7505C182.31 82.9877 187.58 81.4995 192.962 81.4375C198.345 81.3755 203.648 82.742 208.33 85.3976C213.012 88.0532 216.907 91.9029 219.616 96.5544C222.326 101.206 223.753 106.492 223.753 111.875C223.753 117.258 222.326 122.545 219.616 127.197C216.907 131.848 213.012 135.698 208.33 138.353C203.648 141.009 198.345 142.375 192.962 142.313C187.58 142.251 182.31 140.763 177.69 138L138 177.7C141.808 184.071 143.155 191.614 141.79 198.91C140.424 206.205 136.44 212.75 130.585 217.313C124.731 221.875 117.412 224.141 110.004 223.683C102.596 223.226 95.6103 220.077 90.3621 214.828C85.1139 209.58 81.9647 202.595 81.5072 195.187C81.0497 187.779 83.3154 180.459 87.878 174.605C92.4405 168.751 98.9853 164.766 106.281 163.401C113.576 162.035 121.119 163.383 127.49 167.19L167.19 127.49C165.664 124.941 164.518 122.182 163.79 119.3H141.39C139.721 125.858 135.915 131.673 130.573 135.826C125.231 139.98 118.657 142.234 111.89 142.234C105.123 142.234 98.5494 139.98 93.2071 135.826C87.8648 131.673 84.0587 125.858 82.39 119.3H60C58.1878 126.495 53.8086 132.78 47.6863 136.971C41.5641 141.163 34.1211 142.972 26.7579 142.059C19.3947 141.146 12.6191 137.574 7.70605 132.014C2.79302 126.454 0.0813599 119.29 0.0813599 111.87C0.0813599 104.451 2.79302 97.2871 7.70605 91.7272C12.6191 86.1673 19.3947 82.5947 26.7579 81.6817C34.1211 80.7686 41.5641 82.5781 47.6863 86.7696C53.8086 90.9611 58.1878 97.2456 60 104.44H82.35ZM100.86 204.32C103.407 206.868 106.759 208.453 110.345 208.806C113.93 209.159 117.527 208.258 120.522 206.256C123.517 204.254 125.725 201.276 126.771 197.828C127.816 194.38 127.633 190.677 126.253 187.349C124.874 184.021 122.383 181.274 119.205 179.577C116.027 177.88 112.359 177.337 108.826 178.042C105.293 178.746 102.113 180.654 99.8291 183.44C97.5451 186.226 96.2979 189.718 96.3 193.32C96.2985 195.364 96.7006 197.388 97.4831 199.275C98.2656 201.163 99.4132 202.877 100.86 204.32ZM204.32 122.88C206.868 120.333 208.453 116.981 208.806 113.396C209.159 109.811 208.258 106.214 206.256 103.219C204.254 100.223 201.275 98.0151 197.827 96.97C194.38 95.9249 190.676 96.1077 187.348 97.4873C184.02 98.8669 181.274 101.358 179.577 104.536C177.879 107.714 177.337 111.382 178.041 114.915C178.746 118.448 180.653 121.627 183.439 123.911C186.226 126.195 189.717 127.443 193.32 127.44C195.364 127.443 197.388 127.042 199.275 126.259C201.163 125.476 202.878 124.328 204.32 122.88ZM122.88 19.4205C120.333 16.8729 116.981 15.2876 113.395 14.9347C109.81 14.5817 106.213 15.483 103.218 17.4849C100.223 19.4868 98.0146 22.4654 96.9696 25.9131C95.9245 29.3608 96.1073 33.0642 97.4869 36.3922C98.8665 39.7202 101.358 42.4668 104.535 44.1639C107.713 45.861 111.381 46.4036 114.914 45.6992C118.447 44.9949 121.627 43.0871 123.911 40.301C126.195 37.515 127.442 34.0231 127.44 30.4205C127.44 28.3772 127.038 26.3539 126.255 24.4664C125.473 22.5788 124.326 20.8642 122.88 19.4205ZM19.42 100.86C16.8725 103.408 15.2872 106.76 14.9342 110.345C14.5813 113.93 15.4826 117.527 17.4844 120.522C19.4863 123.518 22.4649 125.726 25.9127 126.771C29.3604 127.816 33.0638 127.633 36.3918 126.254C39.7198 124.874 42.4664 122.383 44.1635 119.205C45.8606 116.027 46.4032 112.359 45.6988 108.826C44.9944 105.293 43.0866 102.114 40.3006 99.8296C37.5145 97.5455 34.0227 96.2983 30.42 96.3005C26.2938 96.3018 22.337 97.9421 19.42 100.86ZM100.86 100.86C98.3125 103.408 96.7272 106.76 96.3742 110.345C96.0213 113.93 96.9226 117.527 98.9244 120.522C100.926 123.518 103.905 125.726 107.353 126.771C110.8 127.816 114.504 127.633 117.832 126.254C121.16 124.874 123.906 122.383 125.604 119.205C127.301 116.027 127.843 112.359 127.139 108.826C126.434 105.293 124.527 102.114 121.741 99.8296C118.955 97.5455 115.463 96.2983 111.86 96.3005C109.817 96.299 107.793 96.701 105.905 97.4835C104.018 98.2661 102.303 99.4136 100.86 100.86Z\" fill=\"#00AEEF\"/>\n",
       "    </g>\n",
       "    <defs>\n",
       "        <clipPath id=\"clip0_4338_178347\">\n",
       "            <rect width=\"566.93\" height=\"223.75\" fill=\"white\"/>\n",
       "        </clipPath>\n",
       "    </defs>\n",
       "  </svg>\n",
       "</div>\n",
       "\n",
       "        <table class=\"jp-RenderedHTMLCommon\" style=\"border-collapse: collapse;color: var(--jp-ui-font-color1);font-size: var(--jp-ui-font-size1);\">\n",
       "    <tr>\n",
       "        <td style=\"text-align: left\"><b>Python version:</b></td>\n",
       "        <td style=\"text-align: left\"><b>3.12.9</b></td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "        <td style=\"text-align: left\"><b>Ray version:</b></td>\n",
       "        <td style=\"text-align: left\"><b>2.44.1</b></td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "    <td style=\"text-align: left\"><b>Dashboard:</b></td>\n",
       "    <td style=\"text-align: left\"><b><a href=\"http://session-gcwehd9xxjzkv5lxv8lgcdgx2n.i.anyscaleuserdata.com\" target=\"_blank\">http://session-gcwehd9xxjzkv5lxv8lgcdgx2n.i.anyscaleuserdata.com</a></b></td>\n",
       "</tr>\n",
       "\n",
       "</table>\n",
       "\n",
       "    </div>\n",
       "</div>\n"
      ],
      "text/plain": [
       "RayContext(dashboard_url='session-gcwehd9xxjzkv5lxv8lgcdgx2n.i.anyscaleuserdata.com', python_version='3.12.9', ray_version='2.44.1', ray_commit='66b3699996151045a71991762f4f83ecc03f881e')"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Enable Ray Train v2. It's too good to wait for public release!\n",
    "os.environ[\"RAY_TRAIN_V2_ENABLED\"] = \"1\"\n",
    "ray.init(\n",
    "    address=os.environ.get(\"RAY_ADDRESS\", \"auto\"),\n",
    "    runtime_env={\n",
    "        \"env_vars\": {\"RAY_TRAIN_V2_ENABLED\": \"1\"}, \n",
    "        \"working_dir\": \"../\",  # to import doggos (default working_dir=\".\")\n",
    "    },\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "# This will be removed once Ray Train v2 is part of latest Ray version.\n",
    "echo \"RAY_TRAIN_V2_ENABLED=1\" > /home/ray/default/.env"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Load env vars in notebooks.\n",
    "from dotenv import load_dotenv\n",
    "load_dotenv()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Preprocess"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You need to convert the classes to labels (unique integers) so that you can train a classifier that can correctly predict the class given an input image. But before you do this, apply the same data ingestion and preprocessing as the previous notebook."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def add_class(row):\n",
    "    row[\"class\"] = row[\"path\"].rsplit(\"/\", 3)[-2]\n",
    "    return row"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Preprocess data splits.\n",
    "train_ds = ray.data.read_images(\"s3://doggos-dataset/train\", include_paths=True, shuffle=\"files\")\n",
    "train_ds = train_ds.map(add_class)\n",
    "val_ds = ray.data.read_images(\"s3://doggos-dataset/val\", include_paths=True)\n",
    "val_ds = val_ds.map(add_class)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define a `Preprocessor` class that:\n",
    "- creates an embedding. A later step moves the embedding layer outside of the model since you freeze the embedding layer's weights and so you don't have to do it repeatedly as part of the model's forward pass, saving on unnecessary compute.\n",
    "- convert the classes into labels for the classifier. \n",
    "\n",
    "While you could've just done this step as a simple operation, you're taking the time to organize it as a class so that you can save and load for inference later."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from doggos.embed import EmbeddingGenerator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class Preprocessor:\n",
    "    \"\"\"Preprocessor class.\"\"\"\n",
    "    def __init__(self, class_to_label=None):\n",
    "        self.class_to_label = class_to_label or {}  # mutable defaults\n",
    "        self.label_to_class = {v: k for k, v in self.class_to_label.items()}\n",
    "        \n",
    "    def fit(self, ds, column):\n",
    "        self.classes = ds.unique(column=column)\n",
    "        self.class_to_label = {tag: i for i, tag in enumerate(self.classes)}\n",
    "        self.label_to_class = {v: k for k, v in self.class_to_label.items()}\n",
    "        return self\n",
    "\n",
    "    def convert_to_label(self, row, class_to_label):\n",
    "        if \"class\" in row:\n",
    "            row[\"label\"] = class_to_label[row[\"class\"]]\n",
    "        return row\n",
    "    \n",
    "    def transform(self, ds, concurrency=4, batch_size=64, num_gpus=1):\n",
    "        ds = ds.map(\n",
    "            self.convert_to_label, \n",
    "            fn_kwargs={\"class_to_label\": self.class_to_label},\n",
    "        )\n",
    "        ds = ds.map_batches(\n",
    "            EmbeddingGenerator,\n",
    "            fn_constructor_kwargs={\"model_id\": \"openai/clip-vit-base-patch32\"},\n",
    "            fn_kwargs={\"device\": \"cuda\"}, \n",
    "            concurrency=concurrency, \n",
    "            batch_size=batch_size,\n",
    "            num_gpus=num_gpus,\n",
    "            accelerator_type=\"L4\",\n",
    "        )\n",
    "        ds = ds.drop_columns([\"image\"])\n",
    "        return ds\n",
    "\n",
    "    def save(self, fp):\n",
    "        with open(fp, \"w\") as f:\n",
    "            json.dump(self.class_to_label, f)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-05-29 17:27:48,949\tINFO dataset.py:2809 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.\n",
      "2025-05-29 17:27:48,960\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-29_17-07-53_816345_69024/logs/ray-data\n",
      "2025-05-29 17:27:48,960\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ListFiles] -> TaskPoolMapOperator[ReadFiles] -> TaskPoolMapOperator[Map(add_class)] -> AllToAllOperator[Aggregate] -> LimitOperator[limit=1]\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "0407d1a024934c91b057ec0c1639f125",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "fbc426836b6846959004387547b7b537",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ListFiles 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "36d03f454e5f40efb5b76b684adb5e7a",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ReadFiles 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "94807e4ad66746d981510dd1c13951b8",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- Map(add_class) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "55457eb98eed4f4dba8a9d5856d24f39",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- Aggregate 4: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "78be104361eb4220a7bbcd0df81ba466",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Sort Sample 5:   0%|          | 0.00/1.00 [00:00<?, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "11f157fe050b4711bc36591d51683979",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Shuffle Map 6:   0%|          | 0.00/1.00 [00:00<?, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "7229559456c54c2bbf4404043b525f6a",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Shuffle Reduce 7:   0%|          | 0.00/1.00 [00:00<?, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "61ad98218196419bb69fbe77dfe83718",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- limit=1 8: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m >>> [DBG] partition_files: before: pyarrow.Table\n",
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m __path: string\n",
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m __file_size: int64\n",
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m ----\n",
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m __path: [[\"doggos-dataset/train/basset/basset_10028.jpg\",\"doggos-dataset/train/basset/basset_10054.jpg\",\"doggos-dataset/train/basset/basset_10072.jpg\",\"doggos-dataset/train/basset/basset_10095.jpg\",\"doggos-dataset/train/basset/basset_10110.jpg\",...,\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_889.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_9618.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_962.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_967.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_9739.jpg\"]]\n",
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m __file_size: [[56919,36417,21093,23721,12511,...,19267,43746,29862,37592,32578]]\n",
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m >>> [DBG] partition_files: after: pyarrow.Table\n",
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m __path: string\n",
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m __file_size: int64\n",
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m ----\n",
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m __path: [[\"doggos-dataset/train/collie/collie_873.jpg\",\"doggos-dataset/train/chow/chow_6164.jpg\",\"doggos-dataset/train/great_dane/great_dane_22413.jpg\",\"doggos-dataset/train/bull_mastiff/bull_mastiff_3641.jpg\",\"doggos-dataset/train/pug/pug_2777.jpg\",...,\"doggos-dataset/train/saint_bernard/saint_bernard_7016.jpg\",\"doggos-dataset/train/boxer/boxer_3258.jpg\",\"doggos-dataset/train/german_shepherd/german_shepherd_1451.jpg\",\"doggos-dataset/train/italian_greyhound/italian_greyhound_722.jpg\",\"doggos-dataset/train/dingo/dingo_1228.jpg\"]]\n",
      "\u001b[36m(ListFiles pid=3813, ip=10.0.153.142)\u001b[0m __file_size: [[12220,20577,60063,22426,18320,...,29927,12190,49104,63901,22386]]\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-05-29 17:27:56,241\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-29_17-07-53_816345_69024/logs/ray-data\n",
      "2025-05-29 17:27:56,242\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ListFiles] -> TaskPoolMapOperator[ReadFiles] -> TaskPoolMapOperator[Map(add_class)->Map(Preprocessor.convert_to_label)] -> ActorPoolMapOperator[MapBatches(EmbeddingGenerator)] -> TaskPoolMapOperator[MapBatches(drop_columns)] -> LimitOperator[limit=1]\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "ff7d99b9a60e4fd49c26a0967f0a75cb",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(_MapWorker pid=5046, ip=10.0.153.142)\u001b[0m Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "3ef5a5062dd1466e811f49b41e0378e0",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ListFiles 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "a79a45c863504cf3a927e045c69c497b",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ReadFiles 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "2cec202eb5024b27a1ff86497e616b46",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- Map(add_class)->Map(Preprocessor.convert_to_label) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "2f618f46ff6448c38b5219a43705d11b",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(EmbeddingGenerator) 4: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "5db5f471220c4c02abae208f5a9f307f",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(drop_columns) 5: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "f444d3d0896f44b087478c6d983df44f",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- limit=1 6: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[36m(ListFiles pid=20803, ip=10.0.188.182)\u001b[0m >>> [DBG] partition_files: before: pyarrow.Table\n",
      "\u001b[36m(ListFiles pid=20803, ip=10.0.188.182)\u001b[0m __path: string\u001b[32m [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)\u001b[0m\n",
      "\u001b[36m(ListFiles pid=20803, ip=10.0.188.182)\u001b[0m __file_size: [[36177,23609,26989,23153,12625,...,16142,30551,23825,76180,51123]]\u001b[32m [repeated 4x across cluster]\u001b[0m\n",
      "\u001b[36m(ListFiles pid=20803, ip=10.0.188.182)\u001b[0m ----\u001b[32m [repeated 2x across cluster]\u001b[0m\n",
      "\u001b[36m(ListFiles pid=20803, ip=10.0.188.182)\u001b[0m __path: [[\"doggos-dataset/train/bloodhound/bloodhound_8518.jpg\",\"doggos-dataset/train/eskimo_dog/eskimo_dog_3946.jpg\",\"doggos-dataset/train/toy_poodle/toy_poodle_8951.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_2223.jpg\",\"doggos-dataset/train/doberman/doberman_4013.jpg\",...,\"doggos-dataset/train/pomeranian/pomeranian_6208.jpg\",\"doggos-dataset/train/great_dane/great_dane_33047.jpg\",\"doggos-dataset/train/german_shepherd/german_shepherd_16014.jpg\",\"doggos-dataset/train/french_bulldog/french_bulldog_571.jpg\",\"doggos-dataset/train/labrador_retriever/labrador_retriever_8242.jpg\"]]\u001b[32m [repeated 2x across cluster]\u001b[0m\n",
      "\u001b[36m(ListFiles pid=20803, ip=10.0.188.182)\u001b[0m >>> [DBG] partition_files: after: pyarrow.Table\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "[{'path': 'doggos-dataset/train/bull_mastiff/bull_mastiff_2990.jpg',\n",
       "  'class': 'bull_mastiff',\n",
       "  'label': 31,\n",
       "  'embedding': array([ 2.96250284e-02,  1.84831917e-01, -4.36195195e-01, -1.16633832e-01,\n",
       "          1.68539539e-01, -5.36402464e-01,  1.98912546e-02,  2.06453040e-01,\n",
       "          3.48448306e-02,  3.00380349e-01, -5.23912236e-02, -1.01421580e-01,\n",
       "          2.53218114e-01,  1.93473831e-01, -4.80942428e-04, -1.59629315e-01,\n",
       "          4.49332625e-01, -3.90984029e-01, -4.47450221e-01, -5.69370985e-02,\n",
       "         -9.09840584e-01, -3.40499207e-02, -1.65381983e-01, -3.06253433e-02,\n",
       "         -1.22489721e-01, -8.06758553e-02,  8.20637792e-02, -2.05733925e-01,\n",
       "         -4.27418947e-02,  2.88848668e-01, -2.05624640e-01,  4.98024315e-01,\n",
       "         -9.09496695e-02,  1.60129428e-01, -4.21667337e-01,  4.68383253e-01,\n",
       "          2.99857140e-01, -1.89174742e-01,  9.86181274e-02,  1.56004095e+00,\n",
       "         -1.84508055e-01,  4.93547320e-01, -1.84162885e-01, -1.04826376e-01,\n",
       "          2.28193998e-02, -8.93343449e-01, -4.13281739e-01,  1.20442718e-01,\n",
       "          5.22895530e-03, -9.38682407e-02, -4.32153523e-01,  6.56608343e-02,\n",
       "         -1.35297015e-01,  2.46281177e-01,  3.40996623e-01, -1.62015036e-01,\n",
       "         -5.32976687e-01, -3.91260348e-02, -3.67479920e-01, -4.15423289e-02,\n",
       "         -8.94722193e-02,  1.72933936e-01,  4.23605710e-01, -5.93728721e-02,\n",
       "         -3.98973435e-01,  7.98963234e-02,  2.52494738e-02,  8.36879462e-02,\n",
       "          2.78915018e-01, -5.52195311e-02,  1.31553769e-01,  6.21216968e-02,\n",
       "         -3.64192098e-01,  2.43189603e-01,  6.74815893e-01,  3.29109073e-01,\n",
       "         -1.83294237e-01, -2.46163577e-01,  3.03905368e-01,  3.38399976e-01,\n",
       "          3.37341428e-03, -5.96845508e-01, -4.26457226e-01, -8.70084465e-02,\n",
       "          3.44893366e-01, -1.08226717e-01,  2.96658188e-01, -2.54610926e-01,\n",
       "          2.05015004e-01,  1.30701810e-03, -2.64513552e-01, -3.71359408e-01,\n",
       "         -6.54534817e+00,  3.44033003e-01, -6.28265619e-01, -3.90359372e-01,\n",
       "         -3.74380499e-03,  1.38988346e-02, -1.10473847e+00,  2.50362396e-01,\n",
       "         -1.42730415e-01, -1.43787280e-01, -6.84304774e-01, -1.25266463e-01,\n",
       "          2.66319692e-01, -2.42460966e-01,  1.17406607e+00, -2.62275636e-01,\n",
       "          3.84890854e-01, -5.94873130e-02,  5.12271821e-01, -4.21706140e-01,\n",
       "         -8.20131302e-02, -1.70712233e-01, -3.76368523e-01, -1.33617550e-01,\n",
       "         -4.63656873e-01,  1.44606352e-01,  6.28078580e-02,  4.53218967e-01,\n",
       "         -5.27208522e-02,  3.36382329e-01,  3.34930986e-01,  5.84990144e-01,\n",
       "         -2.28617251e-01, -7.35082150e-01,  9.48296487e-03,  1.49334863e-01,\n",
       "         -1.40943542e-01, -4.14748996e-01,  5.62612951e-01,  1.81827843e-01,\n",
       "          9.43385959e-02,  9.02374148e-01, -2.51428306e-01,  2.22430408e-01,\n",
       "          4.62301433e-01, -3.91607553e-01,  2.22456023e-01, -8.29707310e-02,\n",
       "         -2.76629448e-01, -4.06816214e-01,  3.53234321e-01,  1.20506354e-01,\n",
       "          2.48783782e-01,  4.30057198e-01,  2.68536925e-01, -3.22738290e-01,\n",
       "          5.41464210e-01, -1.24884091e-01, -3.52592170e-01,  2.66138792e-01,\n",
       "          3.05103004e-01,  5.39709210e-01, -4.03170466e-01, -9.22230184e-02,\n",
       "         -3.67029697e-01, -5.56353107e-02,  3.42724919e-01,  1.24198616e-01,\n",
       "          2.33401582e-01,  2.19020993e-01,  1.18174158e-01,  3.24965268e-01,\n",
       "          5.57205565e-02, -9.74518880e-02, -7.47020483e-01, -5.21122515e-01,\n",
       "          3.99736986e-02, -1.86739117e-01,  3.72997791e-01,  1.30057290e-01,\n",
       "          4.33022976e-02, -3.16648424e-01, -1.03869647e-01, -7.47140311e-03,\n",
       "          6.70401454e-01, -4.08043027e-01,  1.22825257e-01, -5.29386997e-02,\n",
       "         -4.34218258e-01,  9.74327624e-02,  2.07144350e-01, -4.42244977e-01,\n",
       "         -4.24851090e-01,  1.25362784e-01,  9.85965207e-02,  1.28973201e-01,\n",
       "          1.92059621e-01,  2.40244269e-01,  4.61459100e-01,  2.93663979e-01,\n",
       "         -2.92330027e-01, -2.43328273e-01,  6.51871115e-02,  7.89132863e-02,\n",
       "          2.98500150e-01, -3.13850999e-01, -1.29151666e+00, -4.41522121e-01,\n",
       "          4.51790169e-02,  2.87928164e-01, -1.51125491e-01, -2.93863952e-01,\n",
       "         -7.32447356e-02, -6.39618754e-01,  2.13040352e-01,  2.82041669e-01,\n",
       "          2.93939203e-01,  3.92971039e-01,  1.38366118e-01,  4.04028475e-01,\n",
       "          3.34911942e-01,  3.02247047e-01, -1.67425573e-02, -4.74340886e-01,\n",
       "         -3.83422226e-01,  5.10016203e-01,  8.26577485e-01,  1.02568477e-01,\n",
       "         -4.55805808e-02,  6.54738247e-01,  2.28678033e-01,  1.37159079e-01,\n",
       "         -9.24654827e-02, -5.07280707e-01,  7.63886422e-03, -1.93190306e-01,\n",
       "          2.26019546e-02,  6.83651119e-03,  1.40444070e-01, -2.16926634e-03,\n",
       "         -1.64991170e-02,  7.61926055e-01,  1.04769439e-01,  1.08303644e-01,\n",
       "         -3.02319497e-01, -1.37703270e-01,  9.11890090e-01, -5.01324177e-01,\n",
       "          6.45365596e-01, -4.21677828e-02,  1.70910358e-01,  5.22469059e-02,\n",
       "          2.17056334e-01,  1.22835696e+00,  1.27081245e-01, -3.88549387e-01,\n",
       "         -2.07857907e-01, -2.20693767e-01,  8.01500916e-01,  2.61110291e-02,\n",
       "          3.63306016e-01, -4.63799328e-01,  2.81473547e-01,  9.82702374e-02,\n",
       "          1.69417500e-01, -1.76126063e-01, -3.41224790e-01, -6.02618575e-01,\n",
       "         -7.25285292e-01,  1.41351372e-02,  3.83767903e-01,  1.22794938e+00,\n",
       "          4.29408193e-01, -2.58128107e-01, -5.95869660e-01, -1.92921594e-01,\n",
       "         -2.88852572e-01,  1.53073624e-01,  3.53883505e-01,  1.24512434e-01,\n",
       "          3.69547039e-01, -2.74194002e-01,  1.68305457e-01,  1.63014561e-01,\n",
       "         -3.48744243e-01,  1.55420899e-01,  5.74553013e-01,  3.73350680e-01,\n",
       "         -4.02397305e-01, -2.14556515e-01,  2.83107907e-03, -1.56753272e-01,\n",
       "          3.00476551e-01, -2.66283303e-02, -4.25655991e-02,  5.83125651e-01,\n",
       "         -8.27721953e-02,  2.02499509e-01,  4.15642470e-01,  3.81397679e-02,\n",
       "         -1.82508603e-01,  1.63246453e-01,  1.48924768e-01,  4.26800251e-01,\n",
       "          4.66284603e-01,  1.16664171e-01,  8.21240246e-02,  1.36114478e-01,\n",
       "         -1.70425683e-01, -4.47229445e-02,  1.90463886e-01,  5.00857353e-01,\n",
       "          2.27475315e-01,  3.36416721e-01, -1.73346981e-01,  2.61976540e-01,\n",
       "         -1.02565289e-01,  1.92886889e-01, -3.88153583e-01,  4.60113376e-01,\n",
       "          8.98428917e-01,  2.64218748e-01,  1.10806517e-01,  3.05772364e-01,\n",
       "          5.61460853e-02,  9.00289953e-01, -2.87446171e-01,  7.55993724e-02,\n",
       "          5.21408796e-01,  7.91677713e-01, -5.44265807e-02, -2.64682055e-01,\n",
       "          3.46329600e-01, -4.11061607e-02, -1.15725271e-01,  2.92501390e-01,\n",
       "         -1.48068532e-01, -3.03346038e-01, -3.22579741e-02, -1.23285927e-01,\n",
       "          4.27493602e-01, -6.64700985e-01, -1.29218474e-01,  5.52579343e-01,\n",
       "         -2.22595483e-02, -9.26547050e-02, -1.22475460e-01, -1.01420909e-01,\n",
       "          4.25300449e-01, -3.64310950e-01,  2.26495028e-01, -8.24260712e-02,\n",
       "          1.87453628e-02,  9.12605375e-02,  3.01520228e-01,  5.60680628e-01,\n",
       "         -7.50853717e-02, -1.50261045e-01, -3.20502996e-01,  8.28965008e-02,\n",
       "          4.98041749e-01, -7.14172125e-02,  3.59995574e-01,  4.34944093e-01,\n",
       "         -2.70076096e-01, -1.40984011e+00,  8.14644024e-02, -1.59098089e-01,\n",
       "          4.55307961e-02, -2.07887962e-01, -6.99192584e-01,  3.18697870e-01,\n",
       "         -6.34252369e-01,  2.21113712e-01, -2.50007808e-01,  1.36830091e-01,\n",
       "         -3.74922939e-02, -3.65554124e-01,  2.31785953e-01, -9.24658701e-02,\n",
       "         -1.84443921e-01, -2.94595331e-01,  2.30411142e-02, -6.69619516e-02,\n",
       "          2.20426035e+00,  9.14194286e-02,  2.26492643e-01, -1.47486746e-01,\n",
       "          1.32254958e-01, -1.09223378e+00, -9.51496214e-02,  4.36387479e-01,\n",
       "          6.24864399e-02, -5.28231025e-01, -1.52313590e-01,  9.14534628e-02,\n",
       "          4.88241494e-01, -1.42186821e+00, -2.10103452e-01, -5.40419698e-01,\n",
       "          9.73313749e-02,  5.34528419e-02, -2.03782424e-01,  7.57632330e-02,\n",
       "         -2.75360703e-01, -5.07763803e-01, -5.28392017e-01, -4.99650776e-01,\n",
       "         -3.36323351e-01,  1.75337970e-01, -1.09768331e-01,  5.61503232e-01,\n",
       "          3.62074465e-01,  5.57297096e-02, -1.59549534e-01, -5.20902798e-02,\n",
       "          2.19010562e-01,  9.11229253e-02,  8.91215444e-01, -5.29729873e-02,\n",
       "         -3.62470686e-01, -3.08555424e-01, -1.41877666e-01, -5.66913709e-02,\n",
       "          5.54823160e-01,  2.24721864e-01,  3.28532457e-02, -1.63407087e-01,\n",
       "         -3.93753350e-02, -1.27589643e-01, -2.41918549e-01, -1.72627866e-02,\n",
       "          6.96997270e-02, -3.83945495e-01,  4.01728898e-01,  4.41772223e-01,\n",
       "         -1.37958974e-02, -6.61438704e-02, -6.06827676e-01,  3.26661646e-01,\n",
       "         -8.38486195e-01,  6.06870711e-01, -3.74650955e-01,  1.31712124e-01,\n",
       "         -4.18629110e-01,  8.58621716e-01, -8.83512646e-02, -1.07507646e-01,\n",
       "          4.20210898e-01,  2.74669856e-01,  1.16677716e-01,  1.60895869e-01,\n",
       "          3.34834248e-01, -1.96968168e-01, -5.13731763e-02, -4.78067964e-01,\n",
       "         -4.39682841e-01, -5.16378760e-01, -7.16363490e-02,  5.15681803e-02,\n",
       "          1.73438191e-01,  2.66358584e-01, -1.61974549e-01, -4.09049392e-02,\n",
       "          5.34818649e-01,  2.94187367e-01,  3.56869131e-01,  2.97727108e-01,\n",
       "          2.10070848e-01, -1.36979789e-01,  4.45851147e-01,  1.13232955e-01,\n",
       "          6.63669348e-01,  1.26997977e-01, -6.55619144e-01,  1.19708240e-01,\n",
       "         -3.93473357e-01, -3.25431943e-01,  4.34662551e-01,  3.26182038e-01,\n",
       "          2.26627588e-02, -1.58187881e-01, -5.19608200e-01, -3.03419709e-01,\n",
       "         -1.49298996e-01,  3.15186262e-01, -5.78400493e-01, -1.49509251e-01,\n",
       "         -3.68623257e-01, -3.36147189e-01,  2.62191743e-02,  6.80889487e-02,\n",
       "          2.83265442e-01,  2.23671257e-01, -3.60259414e-01, -2.07606390e-01,\n",
       "          2.40071222e-01, -2.82378823e-01, -3.75251681e-01, -3.59918922e-01,\n",
       "         -3.34370017e-01, -9.31101441e-02,  3.08177859e-01,  2.11703241e-01,\n",
       "         -7.63330609e-03,  1.65970221e-01,  1.96248814e-02, -5.79220504e-02,\n",
       "         -2.86643118e-01, -2.93000102e-01, -2.25399703e-01, -5.27351648e-02,\n",
       "         -4.95403290e-01, -1.42921299e-01, -5.50922513e-01, -4.28099930e-02,\n",
       "         -6.73933804e-01,  2.58472562e-03,  3.73198509e-01,  2.12427974e-03],\n",
       "        dtype=float32)}]"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Preprocess.\n",
    "preprocessor = Preprocessor()\n",
    "preprocessor = preprocessor.fit(train_ds, column=\"class\")\n",
    "train_ds = preprocessor.transform(ds=train_ds)\n",
    "val_ds = preprocessor.transform(ds=val_ds)\n",
    "train_ds.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert\"> <b> Data processing</b> \n",
    "\n",
    "See this extensive guide on [data loading and preprocessing](https://docs.ray.io/en/latest/train/user-guides/data-loading-preprocessing.html) for the last-mile preprocessing you need to do prior to training your models. However, Ray Data does support performant joins, filters, aggregations, etc., for the more structure data processing your workloads may need."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert\"> <b> Store often, save compute</b> \n",
    "\n",
    "Store the preprocessed data into shared cloud storage to:\n",
    "- save a record of what this preprocessed data looks like\n",
    "- avoid triggering the entire preprocessing for each batch the model processes\n",
    "- avoid [`materialize`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.materialize.html) of the preprocessed data because you shouldn't force large data to fit in memory"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import shutil"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-05-29 17:28:15,900\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-29_17-07-53_816345_69024/logs/ray-data\n",
      "2025-05-29 17:28:15,901\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ListFiles] -> TaskPoolMapOperator[ReadFiles] -> TaskPoolMapOperator[Map(add_class)->Map(Preprocessor.convert_to_label)] -> ActorPoolMapOperator[MapBatches(EmbeddingGenerator)] -> TaskPoolMapOperator[MapBatches(drop_columns)->Write]\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "d53906cff1144d0da3d363fd92d4eb0e",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(_MapWorker pid=5032, ip=10.0.153.142)\u001b[0m Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "503b1b75d6684e10a4cc2411c79ec604",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ListFiles 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "2e003c20484441aaa0112f26c52ee5b1",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ReadFiles 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "49337b465e12449099bf68198ddb6d88",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- Map(add_class)->Map(Preprocessor.convert_to_label) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "4b1a2a4969b74f829621509815eab82d",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(EmbeddingGenerator) 4: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "5a30f731d6d3459d8f92c80c97be7da6",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(drop_columns)->Write 5: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m >>> [DBG] partition_files: before: pyarrow.Table\n",
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m __path: string\n",
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m __file_size: int64\n",
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m ----\n",
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m __path: [[\"doggos-dataset/train/basset/basset_10028.jpg\",\"doggos-dataset/train/basset/basset_10054.jpg\",\"doggos-dataset/train/basset/basset_10072.jpg\",\"doggos-dataset/train/basset/basset_10095.jpg\",\"doggos-dataset/train/basset/basset_10110.jpg\",...,\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_889.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_9618.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_962.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_967.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_9739.jpg\"]]\n",
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m __file_size: [[56919,36417,21093,23721,12511,...,19267,43746,29862,37592,32578]]\n",
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m >>> [DBG] partition_files: after: pyarrow.Table\n",
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m __path: string\n",
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m __file_size: int64\n",
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m ----\n",
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m __path: [[\"doggos-dataset/train/miniature_schnauzer/miniature_schnauzer_1287.jpg\",\"doggos-dataset/train/malamute/malamute_12294.jpg\",\"doggos-dataset/train/german_shepherd/german_shepherd_17240.jpg\",\"doggos-dataset/train/bull_mastiff/bull_mastiff_3793.jpg\",\"doggos-dataset/train/toy_poodle/toy_poodle_1077.jpg\",...,\"doggos-dataset/train/great_dane/great_dane_2009.jpg\",\"doggos-dataset/train/shih_tzu/shih_tzu_6106.jpg\",\"doggos-dataset/train/doberman/doberman_8834.jpg\",\"doggos-dataset/train/saint_bernard/saint_bernard_10215.jpg\",\"doggos-dataset/train/toy_poodle/toy_poodle_2883.jpg\"]]\n",
      "\u001b[36m(ListFiles pid=20806, ip=10.0.188.182)\u001b[0m __file_size: [[43906,30606,51639,25912,17992,...,22982,70605,63651,26717,22505]]\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-05-29 17:28:31,996\tINFO dataset.py:4178 -- Data sink Parquet finished. 2880 rows and 5.9MB data written.\n",
      "2025-05-29 17:28:32,022\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-29_17-07-53_816345_69024/logs/ray-data\n",
      "2025-05-29 17:28:32,023\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ListFiles] -> TaskPoolMapOperator[ReadFiles] -> TaskPoolMapOperator[Map(add_class)->Map(Preprocessor.convert_to_label)] -> ActorPoolMapOperator[MapBatches(EmbeddingGenerator)] -> TaskPoolMapOperator[MapBatches(drop_columns)->Write]\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "e9e96b341a8c4602aeabbacf562bf8b8",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(_MapWorker pid=17181, ip=10.0.153.142)\u001b[0m Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.\u001b[32m [repeated 4x across cluster]\u001b[0m\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "04cefeec4d46433a9a8e9239b6f3d26e",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ListFiles 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "5470c35e5ed04171a8d49240e1df716d",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ReadFiles 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "26bfecdbc444451cbeeee92648007bc5",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- Map(add_class)->Map(Preprocessor.convert_to_label) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "e4b1ae21f18945619160a21fbd7e8f9e",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(EmbeddingGenerator) 4: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "ed2742be10bf44c8bbdb79a0324181c7",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(drop_columns)->Write 5: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-05-29 17:28:43,840\tINFO dataset.py:4178 -- Data sink Parquet finished. 720 rows and 1.5MB data written.\n"
     ]
    }
   ],
   "source": [
    "# Write processed data to cloud storage.\n",
    "preprocessed_data_path = os.path.join(\"/mnt/cluster_storage\", \"doggos/preprocessed_data\")\n",
    "if os.path.exists(preprocessed_data_path):  # Clean up.\n",
    "    shutil.rmtree(preprocessed_data_path)\n",
    "preprocessed_train_path = os.path.join(preprocessed_data_path, \"preprocessed_train\")\n",
    "preprocessed_val_path = os.path.join(preprocessed_data_path, \"preprocessed_val\")\n",
    "train_ds.write_parquet(preprocessed_train_path)\n",
    "val_ds.write_parquet(preprocessed_val_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define the model--a simple two layer neural net with Softmax layer to predict class probabilities. Notice that it's all just base PyTorch and nothing else."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "from pathlib import Path\n",
    "import torch\n",
    "import torch.nn as nn\n",
    "import torch.nn.functional as F"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class ClassificationModel(torch.nn.Module):\n",
    "    def __init__(self, embedding_dim, hidden_dim, dropout_p, num_classes):\n",
    "        super().__init__()\n",
    "        # Hyperparameters.\n",
    "        self.embedding_dim = embedding_dim\n",
    "        self.hidden_dim = hidden_dim\n",
    "        self.dropout_p = dropout_p\n",
    "        self.num_classes = num_classes\n",
    "\n",
    "        # Define layers.\n",
    "        self.fc1 = nn.Linear(embedding_dim, hidden_dim)\n",
    "        self.batch_norm = nn.BatchNorm1d(hidden_dim)\n",
    "        self.relu = nn.ReLU()\n",
    "        self.dropout = nn.Dropout(dropout_p)\n",
    "        self.fc2 = nn.Linear(hidden_dim, num_classes)\n",
    "\n",
    "    def forward(self, batch):\n",
    "        z = self.fc1(batch[\"embedding\"])\n",
    "        z = self.batch_norm(z)\n",
    "        z = self.relu(z)\n",
    "        z = self.dropout(z)\n",
    "        z = self.fc2(z)\n",
    "        return z\n",
    "\n",
    "    @torch.inference_mode()\n",
    "    def predict(self, batch):\n",
    "        z = self(batch)\n",
    "        y_pred = torch.argmax(z, dim=1).cpu().numpy()\n",
    "        return y_pred\n",
    "\n",
    "    @torch.inference_mode()\n",
    "    def predict_probabilities(self, batch):\n",
    "        z = self(batch)\n",
    "        y_probs = F.softmax(z, dim=1).cpu().numpy()\n",
    "        return y_probs\n",
    "\n",
    "    def save(self, dp):\n",
    "        Path(dp).mkdir(parents=True, exist_ok=True)\n",
    "        with open(Path(dp, \"args.json\"), \"w\") as fp:\n",
    "            json.dump({\n",
    "                \"embedding_dim\": self.embedding_dim,\n",
    "                \"hidden_dim\": self.hidden_dim,\n",
    "                \"dropout_p\": self.dropout_p,\n",
    "                \"num_classes\": self.num_classes,\n",
    "            }, fp, indent=4)\n",
    "        torch.save(self.state_dict(), Path(dp, \"model.pt\"))\n",
    "\n",
    "    @classmethod\n",
    "    def load(cls, args_fp, state_dict_fp, device=\"cpu\"):\n",
    "        with open(args_fp, \"r\") as fp:\n",
    "            model = cls(**json.load(fp))\n",
    "        model.load_state_dict(torch.load(state_dict_fp, map_location=device))\n",
    "        return model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "ClassificationModel(\n",
      "  (fc1): Linear(in_features=512, out_features=256, bias=True)\n",
      "  (batch_norm): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)\n",
      "  (relu): ReLU()\n",
      "  (dropout): Dropout(p=0.3, inplace=False)\n",
      "  (fc2): Linear(in_features=256, out_features=36, bias=True)\n",
      ")\n"
     ]
    }
   ],
   "source": [
    "# Initialize model.\n",
    "num_classes = len(preprocessor.classes)\n",
    "model = ClassificationModel(\n",
    "    embedding_dim=512, \n",
    "    hidden_dim=256, \n",
    "    dropout_p=0.3, \n",
    "    num_classes=num_classes,\n",
    ")\n",
    "print (model)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Batching"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Take a look at a sample batch of data and ensure that tensors have the proper data type."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from ray.train.torch import get_device"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def collate_fn(batch):\n",
    "    dtypes = {\"embedding\": torch.float32, \"label\": torch.int64}\n",
    "    tensor_batch = {}\n",
    "    for key in dtypes.keys():\n",
    "        if key in batch:\n",
    "            tensor_batch[key] = torch.as_tensor(\n",
    "                batch[key],\n",
    "                dtype=dtypes[key],\n",
    "                device=get_device(),\n",
    "            )\n",
    "    return tensor_batch"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-05-29 17:28:44,684\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-29_17-07-53_816345_69024/logs/ray-data\n",
      "2025-05-29 17:28:44,685\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ListFiles] -> TaskPoolMapOperator[ReadFiles] -> TaskPoolMapOperator[Map(add_class)->Map(Preprocessor.convert_to_label)] -> ActorPoolMapOperator[MapBatches(EmbeddingGenerator)] -> TaskPoolMapOperator[MapBatches(drop_columns)] -> LimitOperator[limit=3]\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "e3a1f33944534daa9a8f91f35f0fc219",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(_MapWorker pid=18764, ip=10.0.153.142)\u001b[0m Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "e25f0afa45e542a99330818d1ca9fa7f",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ListFiles 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "841db49b81f848e09af429840c42cd71",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ReadFiles 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "357b73ae01144d5abec97a95f077117e",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- Map(add_class)->Map(Preprocessor.convert_to_label) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "f03438fca37944d6954e74fcfb58af00",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(EmbeddingGenerator) 4: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "97ef130d76784ce589f149795778a857",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(drop_columns) 5: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "3104b8f0dd344b788b2f7d57f25c2170",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- limit=3 6: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m >>> [DBG] partition_files: before: pyarrow.Table\n",
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m __path: string\n",
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m __file_size: int64\n",
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m ----\n",
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m __path: [[\"doggos-dataset/train/basset/basset_10028.jpg\",\"doggos-dataset/train/basset/basset_10054.jpg\",\"doggos-dataset/train/basset/basset_10072.jpg\",\"doggos-dataset/train/basset/basset_10095.jpg\",\"doggos-dataset/train/basset/basset_10110.jpg\",...,\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_889.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_9618.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_962.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_967.jpg\",\"doggos-dataset/train/yorkshire_terrier/yorkshire_terrier_9739.jpg\"]]\n",
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m __file_size: [[56919,36417,21093,23721,12511,...,19267,43746,29862,37592,32578]]\n",
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m >>> [DBG] partition_files: after: pyarrow.Table\n",
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m __path: string\n",
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m __file_size: int64\n",
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m ----\n",
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m __path: [[\"doggos-dataset/train/great_dane/great_dane_1449.jpg\",\"doggos-dataset/train/toy_poodle/toy_poodle_1063.jpg\",\"doggos-dataset/train/malamute/malamute_6508.jpg\",\"doggos-dataset/train/cocker_spaniel/cocker_spaniel_12238.jpg\",\"doggos-dataset/train/siberian_husky/siberian_husky_14283.jpg\",...,\"doggos-dataset/train/golden_retriever/golden_retriever_3073.jpg\",\"doggos-dataset/train/saint_bernard/saint_bernard_1717.jpg\",\"doggos-dataset/train/golden_retriever/golden_retriever_5453.jpg\",\"doggos-dataset/train/siberian_husky/siberian_husky_10047.jpg\",\"doggos-dataset/train/cocker_spaniel/cocker_spaniel_9495.jpg\"]]\n",
      "\u001b[36m(ListFiles pid=20802, ip=10.0.188.182)\u001b[0m __file_size: [[29069,26986,31549,25414,24028,...,30150,43893,28905,55984,8928]]\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\n",
      "  tensor_batch[key] = torch.as_tensor(\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{'embedding': tensor([[-0.1340,  0.0319,  0.0136,  ...,  0.4513, -0.0579,  0.4205],\n",
       "         [ 0.0622,  0.0628, -0.1967,  ...,  0.3679, -0.1252,  0.4687],\n",
       "         [-0.0162,  0.0074, -0.1264,  ...,  1.3786,  0.2223,  0.1236]]),\n",
       " 'label': tensor([14,  1, 34])}"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Sample batch.\n",
    "sample_batch = train_ds.take_batch(batch_size=3)\n",
    "collate_fn(batch=sample_batch)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Model registry"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a model registry in [Anyscale user storage](https://docs.anyscale.com/configuration/storage/#user-storage) to save the model checkpoints to. Use OSS MLflow but you can easily [set up other experiment trackers](https://docs.ray.io/en/latest/train/user-guides/experiment-tracking.html) with Ray."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import shutil"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "model_registry = \"/mnt/cluster_storage/mlflow/doggos\"\n",
    "if os.path.isdir(model_registry):\n",
    "    shutil.rmtree(model_registry)  # Clean up.\n",
    "os.makedirs(model_registry, exist_ok=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Training"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define the training workload by specifying the:\n",
    "- experiment and model parameters\n",
    "- compute scaling configuration\n",
    "- forward pass for batches of training and validation data\n",
    "- train loop for each epoch of data and checkpointing\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/trainer.png\" width=500>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Train loop config.\n",
    "experiment_name = \"doggos\"\n",
    "train_loop_config = {\n",
    "    \"model_registry\": model_registry,\n",
    "    \"experiment_name\": experiment_name,\n",
    "    \"embedding_dim\": 512,\n",
    "    \"hidden_dim\": 256,\n",
    "    \"dropout_p\": 0.3,\n",
    "    \"lr\": 1e-3,\n",
    "    \"lr_factor\": 0.8,\n",
    "    \"lr_patience\": 3,\n",
    "    \"num_epochs\": 20,\n",
    "    \"batch_size\": 256,\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Scaling config.\n",
    "num_workers = 2\n",
    "scaling_config = ray.train.ScalingConfig(\n",
    "    num_workers=num_workers,\n",
    "    use_gpu=True,\n",
    "    resources_per_worker={\"CPU\": 8, \"GPU\": 2},\n",
    "    accelerator_type=\"L4\",\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tempfile\n",
    "import mlflow\n",
    "import numpy as np\n",
    "from ray.train.torch import TorchTrainer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def train_epoch(ds, batch_size, model, num_classes, loss_fn, optimizer):\n",
    "    model.train()\n",
    "    loss = 0.0\n",
    "    ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)\n",
    "    for i, batch in enumerate(ds_generator):\n",
    "        optimizer.zero_grad()  # Reset gradients.\n",
    "        z = model(batch)  # Forward pass.\n",
    "        targets = F.one_hot(batch[\"label\"], num_classes=num_classes).float()\n",
    "        J = loss_fn(z, targets)  # Define loss.\n",
    "        J.backward()  # Backward pass.\n",
    "        optimizer.step()  # Update weights.\n",
    "        loss += (J.detach().item() - loss) / (i + 1)  # Cumulative loss\n",
    "    return loss"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def eval_epoch(ds, batch_size, model, num_classes, loss_fn):\n",
    "    model.eval()\n",
    "    loss = 0.0\n",
    "    y_trues, y_preds = [], []\n",
    "    ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)\n",
    "    with torch.inference_mode():\n",
    "        for i, batch in enumerate(ds_generator):\n",
    "            z = model(batch)\n",
    "            targets = F.one_hot(batch[\"label\"], num_classes=num_classes).float()  # one-hot (for loss_fn)\n",
    "            J = loss_fn(z, targets).item()\n",
    "            loss += (J - loss) / (i + 1)\n",
    "            y_trues.extend(batch[\"label\"].cpu().numpy())\n",
    "            y_preds.extend(torch.argmax(z, dim=1).cpu().numpy())\n",
    "    return loss, np.vstack(y_trues), np.vstack(y_preds)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def train_loop_per_worker(config):\n",
    "    # Hyperparameters.\n",
    "    model_registry = config[\"model_registry\"]\n",
    "    experiment_name = config[\"experiment_name\"]\n",
    "    embedding_dim = config[\"embedding_dim\"]\n",
    "    hidden_dim = config[\"hidden_dim\"]\n",
    "    dropout_p = config[\"dropout_p\"]\n",
    "    lr = config[\"lr\"]\n",
    "    lr_factor = config[\"lr_factor\"]\n",
    "    lr_patience = config[\"lr_patience\"]\n",
    "    num_epochs = config[\"num_epochs\"]\n",
    "    batch_size = config[\"batch_size\"]\n",
    "    num_classes = config[\"num_classes\"]\n",
    "\n",
    "    # Experiment tracking.\n",
    "    if ray.train.get_context().get_world_rank() == 0:\n",
    "        mlflow.set_tracking_uri(f\"file:{model_registry}\")\n",
    "        mlflow.set_experiment(experiment_name)\n",
    "        mlflow.start_run()\n",
    "        mlflow.log_params(config)\n",
    "\n",
    "    # Datasets.\n",
    "    train_ds = ray.train.get_dataset_shard(\"train\")\n",
    "    val_ds = ray.train.get_dataset_shard(\"val\")\n",
    "\n",
    "    # Model.\n",
    "    model = ClassificationModel(\n",
    "        embedding_dim=embedding_dim, \n",
    "        hidden_dim=hidden_dim, \n",
    "        dropout_p=dropout_p, \n",
    "        num_classes=num_classes,\n",
    "    )\n",
    "    model = ray.train.torch.prepare_model(model)\n",
    "\n",
    "    # Training components.\n",
    "    loss_fn = torch.nn.CrossEntropyLoss()\n",
    "    optimizer = torch.optim.Adam(model.parameters(), lr=lr)\n",
    "    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(\n",
    "        optimizer, \n",
    "        mode=\"min\", \n",
    "        factor=lr_factor, \n",
    "        patience=lr_patience,\n",
    "    )\n",
    "\n",
    "    # Training.\n",
    "    best_val_loss = float(\"inf\")\n",
    "    for epoch in range(num_epochs):\n",
    "        # Steps\n",
    "        train_loss = train_epoch(train_ds, batch_size, model, num_classes, loss_fn, optimizer)\n",
    "        val_loss, _, _ = eval_epoch(val_ds, batch_size, model, num_classes, loss_fn)\n",
    "        scheduler.step(val_loss)\n",
    "\n",
    "        # Checkpoint (metrics, preprocessor and model artifacts).\n",
    "        with tempfile.TemporaryDirectory() as dp:\n",
    "            model.module.save(dp=dp)\n",
    "            metrics = dict(lr=optimizer.param_groups[0][\"lr\"], train_loss=train_loss, val_loss=val_loss)\n",
    "            with open(os.path.join(dp, \"class_to_label.json\"), \"w\") as fp:\n",
    "                json.dump(config[\"class_to_label\"], fp, indent=4)\n",
    "            if ray.train.get_context().get_world_rank() == 0:  # only on main worker 0\n",
    "                mlflow.log_metrics(metrics, step=epoch)\n",
    "                if val_loss < best_val_loss:\n",
    "                    best_val_loss = val_loss\n",
    "                    mlflow.log_artifacts(dp)\n",
    "\n",
    "    # End experiment tracking.\n",
    "    if ray.train.get_context().get_world_rank() == 0:\n",
    "        mlflow.end_run()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert\"> <b> Minimal change to your training code</b> \n",
    "\n",
    "Notice that there isn't much new Ray Train code on top of the base PyTorch code. You specified how you want to scale out the training workload, load the Ray datasets, and then checkpoint on the main worker node and that's it. See these guides ([PyTorch](https://docs.ray.io/en/latest/train/getting-started-pytorch.html), [PyTorch Lightning](https://docs.ray.io/en/latest/train/getting-started-pytorch-lightning.html), [Hugging Face Transformers](https://docs.ray.io/en/latest/train/getting-started-transformers.html)) to see the minimal delta code needed to distribute your training workloads. See this extensive list of [Ray Train user guides](https://docs.ray.io/en/latest/train/user-guides.html)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load preprocessed datasets.\n",
    "preprocessed_train_ds = ray.data.read_parquet(preprocessed_train_path)\n",
    "preprocessed_val_ds = ray.data.read_parquet(preprocessed_val_path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Trainer.\n",
    "train_loop_config[\"class_to_label\"] = preprocessor.class_to_label\n",
    "train_loop_config[\"num_classes\"] = len(preprocessor.class_to_label)\n",
    "trainer = TorchTrainer(\n",
    "    train_loop_per_worker=train_loop_per_worker,\n",
    "    train_loop_config=train_loop_config,\n",
    "    scaling_config=scaling_config,\n",
    "    datasets={\"train\": preprocessed_train_ds, \"val\": preprocessed_val_ds},\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert\"> <b> Ray Train</b> \n",
    "\n",
    "- automatically handles **multi-node, multi-GPU** setup with no manual SSH setup or hostfile configs. \n",
    "- define **per-worker fractional resource requirements**, for example, 2 CPUs and 0.5 GPU per worker.\n",
    "- run on **heterogeneous machines** and scale flexibly, for example, CPU for preprocessing and GPU for training. \n",
    "- built-in **fault tolerance** with retry of failed workers and continue from last checkpoint.\n",
    "- supports Data Parallel, Model Parallel, Parameter Server, and even custom strategies.\n",
    "- [Ray Compiled graphs](https://docs.ray.io/en/latest/ray-core/compiled-graph/ray-compiled-graph.html) allow you to even define different parallelism for jointly optimizing multiple models like Megatron, DeepSpeed, etc., or only allow for one global setting.\n",
    "- You can also use Torch DDP, FSPD, DeepSpeed, etc., under the hood.\n",
    "\n",
    "[RayTurbo Train](https://docs.anyscale.com/rayturbo/rayturbo-train) offers even more improvement to the price-performance ratio, performance monitoring and more:\n",
    "- **elastic training** to scale to a dynamic number of workers, continue training on fewer resources, even on spot instances.\n",
    "- **purpose-built dashboard** designed to streamline the debugging of Ray Train workloads:\n",
    "    - Monitoring: View the status of training runs and train workers.\n",
    "    - Metrics: See insights on training throughput and training system operation time.\n",
    "    - Profiling: Investigate bottlenecks, hangs, or errors from individual training worker processes.\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/train_dashboard.png\" width=700>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Train.\n",
    "results = trainer.fit()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can view experiment metrics and model artifacts in the model registry. You're using OSS MLflow so you can run the server by pointing to the model registry location:\n",
    "\n",
    "```bash\n",
    "mlflow server -h 0.0.0.0 -p 8080 --backend-store-uri /mnt/user_storage/mlflow/doggos\n",
    "```\n",
    "\n",
    "You can view the dashboard by going to the **Overview tab** > **Open Ports**. \n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/mlflow.png\" width=685>\n",
    "\n",
    "You also have the preceding Ray Dashboard and Train workload specific dashboards.\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/train_metrics.png\" width=700>\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "run_id                                      c74f338f00434316bd209763b9636ced\n",
       "experiment_id                                             895161238195662889\n",
       "status                                                              FINISHED\n",
       "artifact_uri               file:///mnt/cluster_storage/mlflow/doggos/8951...\n",
       "start_time                                  2025-05-29 17:29:12.724000+00:00\n",
       "end_time                                    2025-05-29 17:29:25.486000+00:00\n",
       "metrics.lr                                                             0.001\n",
       "metrics.train_loss                                                   0.18294\n",
       "metrics.val_loss                                                    0.505507\n",
       "params.lr                                                              0.001\n",
       "params.batch_size                                                        256\n",
       "params.dropout_p                                                         0.3\n",
       "params.num_epochs                                                         20\n",
       "params.embedding_dim                                                     512\n",
       "params.hidden_dim                                                        256\n",
       "params.num_classes                                                        36\n",
       "params.lr_patience                                                         3\n",
       "params.class_to_label      {'basset': 0, 'bloodhound': 1, 'pomeranian': 2...\n",
       "params.lr_factor                                                         0.8\n",
       "params.experiment_name                                                doggos\n",
       "params.model_registry                     /mnt/cluster_storage/mlflow/doggos\n",
       "tags.mlflow.source.type                                                LOCAL\n",
       "tags.mlflow.user                                                         ray\n",
       "tags.mlflow.source.name    /home/ray/anaconda3/lib/python3.12/site-packag...\n",
       "tags.mlflow.runName                                          orderly-deer-47\n",
       "Name: 0, dtype: object"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Sorted runs.\n",
    "mlflow.set_tracking_uri(f\"file:{model_registry}\")\n",
    "sorted_runs = mlflow.search_runs(\n",
    "    experiment_names=[experiment_name], \n",
    "    order_by=[\"metrics.val_loss ASC\"])\n",
    "best_run = sorted_runs.iloc[0]\n",
    "best_run"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can easily wrap the training workload as a production grade [Anyscale Job](https://docs.anyscale.com/platform/jobs/) ([API ref](https://docs.anyscale.com/reference/job-api/)).\n",
    "\n",
    "**Note**: \n",
    "- This tutorial uses a `containerfile` to define dependencies, but you could easily use a pre-built image as well.\n",
    "- You can specify the compute as a [compute config](https://docs.anyscale.com/configuration/compute-configuration/) or inline in a [job config](https://docs.anyscale.com/reference/job-api#job-cli) file.\n",
    "- When you don't specify compute while launching from a workspace, this configuration defaults to the compute configuration of the workspace."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```bash\n",
    "# Production batch job.\n",
    "anyscale job submit --name=train-doggos-model \\\n",
    "  --containerfile=\"/home/ray/default/containerfile\" \\\n",
    "  --working-dir=\"/home/ray/default\" \\\n",
    "  --exclude=\"\" \\\n",
    "  --max-retries=0 \\\n",
    "  -- python doggos/train.py\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/train_job.png\" width=700>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Evaluation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This tutorial concludes by evaluating the trained model on the test dataset. Evaluation is essentially the same as the batch inference workload where you apply the model on batches of data and then calculate metrics using the predictions versus true labels. Ray Data is hyper optimized for throughput so preserving order isn't a priority. But for evaluation, this approach is crucial. Achieve this approach by preserving the entire row and adding the predicted label as another column to each row."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from urllib.parse import urlparse\n",
    "from sklearn.metrics import multilabel_confusion_matrix"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class TorchPredictor:\n",
    "    def __init__(self, preprocessor, model):\n",
    "        self.preprocessor = preprocessor\n",
    "        self.model = model\n",
    "        self.model.eval()\n",
    "\n",
    "    def __call__(self, batch, device=\"cuda\"):\n",
    "        self.model.to(device)\n",
    "        batch[\"prediction\"] = self.model.predict(collate_fn(batch))\n",
    "        return batch\n",
    "\n",
    "    def predict_probabilities(self, batch, device=\"cuda\"):\n",
    "        self.model.to(device)\n",
    "        predicted_probabilities = self.model.predict_probabilities(collate_fn(batch))\n",
    "        batch[\"probabilities\"] = [\n",
    "            {self.preprocessor.label_to_class[i]: prob for i, prob in enumerate(probabilities)}\n",
    "            for probabilities in predicted_probabilities\n",
    "        ]\n",
    "        return batch\n",
    "    \n",
    "    @classmethod\n",
    "    def from_artifacts_dir(cls, artifacts_dir):\n",
    "        with open(os.path.join(artifacts_dir, \"class_to_label.json\"), \"r\") as fp:\n",
    "            class_to_label = json.load(fp)\n",
    "        preprocessor = Preprocessor(class_to_label=class_to_label)\n",
    "        model = ClassificationModel.load(\n",
    "            args_fp=os.path.join(artifacts_dir, \"args.json\"), \n",
    "            state_dict_fp=os.path.join(artifacts_dir, \"model.pt\"),\n",
    "        )\n",
    "        return cls(preprocessor=preprocessor, model=model)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load and preproces eval dataset.\n",
    "artifacts_dir = urlparse(best_run.artifact_uri).path\n",
    "predictor = TorchPredictor.from_artifacts_dir(artifacts_dir=artifacts_dir)\n",
    "test_ds = ray.data.read_images(\"s3://doggos-dataset/test\", include_paths=True)\n",
    "test_ds = test_ds.map(add_class)\n",
    "test_ds = predictor.preprocessor.transform(ds=test_ds)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-05-29 17:30:05,501\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-29_17-07-53_816345_69024/logs/ray-data\n",
      "2025-05-29 17:30:05,502\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ListFiles] -> TaskPoolMapOperator[ReadFiles] -> TaskPoolMapOperator[Map(add_class)->Map(Preprocessor.convert_to_label)] -> ActorPoolMapOperator[MapBatches(EmbeddingGenerator)] -> TaskPoolMapOperator[MapBatches(drop_columns)] -> TaskPoolMapOperator[MapBatches(TorchPredictor)] -> LimitOperator[limit=1]\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "024577fc91c24c319b7f851d5ed13a89",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(_MapWorker pid=23368, ip=10.0.153.142)\u001b[0m Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "f6e61c791d0b43c9b33389e7ce886421",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ListFiles 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "d40cfc5411b544cca860a8ab80dbfc64",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ReadFiles 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "a5629da8b54f4fb3a765323e83b5e832",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- Map(add_class)->Map(Preprocessor.convert_to_label) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "ca31ff48749744ffa6db6f226935a499",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(EmbeddingGenerator) 4: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "90004f6896a44c9eb440d62dfd789d67",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(drop_columns) 5: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "fa07f95ae3d64bb6b04a2d6a18472411",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(TorchPredictor) 6: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "c493fe1c9f3e46d2999e2ed7e5c5be62",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- limit=1 7: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[36m(autoscaler +2m54s)\u001b[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.\n",
      "\u001b[36m(autoscaler +2m54s)\u001b[0m [autoscaler] [4xL4:48CPU-192GB] Attempting to add 1 node(s) to the cluster (increasing from 1 to 2).\n",
      "\u001b[36m(autoscaler +2m54s)\u001b[0m [autoscaler] [4xL4:48CPU-192GB] Launched 1 instances.\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(MapBatches(TorchPredictor) pid=24018, ip=10.0.153.142)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\n",
      "\u001b[36m(_MapWorker pid=23367, ip=10.0.153.142)\u001b[0m Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.\u001b[32m [repeated 3x across cluster]\u001b[0m\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "[{'path': 'doggos-dataset/test/basset/basset_10288.jpg',\n",
       "  'class': 'basset',\n",
       "  'label': 0,\n",
       "  'embedding': array([-1.04914226e-01, -2.44790107e-01, -9.95984226e-02,  1.35369539e-01,\n",
       "         -5.52583262e-02, -5.80726117e-02,  1.91796213e-01,  1.56358749e-01,\n",
       "         -6.07913554e-01,  2.08769724e-01, -3.80894728e-02, -1.11314341e-01,\n",
       "         -1.96144953e-01, -6.14985377e-02,  5.18052638e-01,  2.08481997e-01,\n",
       "          1.18680000e+00,  2.00228184e-01, -2.38505781e-01,  7.44115859e-02,\n",
       "         -1.17920645e-01,  1.65981501e-02,  4.06986445e-01,  1.73044205e-02,\n",
       "         -7.19357654e-02, -2.49894172e-01,  5.69961220e-02, -2.07781866e-02,\n",
       "         -2.98084021e-01, -1.49074584e-01,  2.44871676e-02,  4.86775905e-01,\n",
       "          3.78374040e-01, -2.37519056e-01,  1.26715392e-01,  1.10406190e-01,\n",
       "          1.23483971e-01, -2.53295779e-01, -1.41814783e-01,  1.88360167e+00,\n",
       "         -4.67942834e-01, -1.71202302e-01,  2.93785751e-01,  9.53234285e-02,\n",
       "         -1.08036682e-01, -1.05388689e+00,  2.12952226e-01,  3.43122810e-01,\n",
       "         -9.08569694e-02, -6.02111407e-02,  1.57679915e-02,  1.13998108e-01,\n",
       "         -9.61575359e-02,  1.91041157e-01,  3.62989418e-02, -1.67392865e-02,\n",
       "          4.08946276e-01,  4.58515316e-01, -4.09091711e-01, -3.85877192e-01,\n",
       "          9.77702141e-01, -1.69140399e-02,  1.93178892e-01,  1.36374265e-01,\n",
       "         -2.66536981e-01, -6.00859225e-01, -5.44141121e-02,  1.52056098e-01,\n",
       "         -2.88875699e-01,  2.30367392e-01,  6.66390955e-02, -3.48750472e-01,\n",
       "          1.32896870e-01,  2.43517607e-01, -3.36737931e-03,  2.86127388e-01,\n",
       "         -3.56746495e-01, -1.14945166e-01,  1.51565254e-01,  4.90364283e-02,\n",
       "          7.63746351e-02, -2.27372758e-02,  2.54388034e-01, -5.34341276e-01,\n",
       "          3.07917535e-01,  4.43625785e-02,  3.23389471e-02, -3.16016316e-01,\n",
       "          3.49402726e-01,  1.40897527e-01, -3.93401444e-01, -6.98464036e-01,\n",
       "         -7.05318308e+00, -9.64105129e-02, -1.29345357e-01,  1.01153195e-01,\n",
       "          1.66764855e-03,  2.46858060e-01, -6.62657976e-01,  8.84698868e-01,\n",
       "         -2.41106033e-01, -1.67729259e-01, -2.76175499e-01, -1.06329359e-01,\n",
       "          4.68528211e-01, -2.96109021e-01,  5.00091314e-01, -1.51706636e-02,\n",
       "          1.84736028e-01, -4.76170719e-01,  2.78874636e-01, -7.43267775e-01,\n",
       "          3.29547435e-01,  9.67946649e-03, -2.46127650e-01, -2.13637337e-01,\n",
       "         -5.42725444e-01,  3.51179391e-01, -2.11807266e-01,  3.27731073e-01,\n",
       "          1.95189521e-01,  1.26088023e-01,  6.48026705e-01,  2.56954283e-01,\n",
       "          4.22701299e-01, -2.30529726e-01, -1.10486448e-01, -1.01444036e-01,\n",
       "          7.89590180e-03, -2.47239798e-01,  1.73558548e-01,  3.03944141e-01,\n",
       "         -5.77826388e-02,  9.45507646e-01, -4.95145321e-01,  2.86680460e-01,\n",
       "         -7.24357143e-02, -8.29980373e-01,  4.94337440e-01,  2.54262328e-01,\n",
       "          2.29299664e-01, -2.25471258e-02,  5.62192798e-01,  3.00549269e-01,\n",
       "         -2.83114985e-02,  3.84202898e-01,  2.89719164e-01,  3.54924083e-01,\n",
       "          2.66314536e-01, -3.58393282e-01, -3.72334421e-01,  5.86691380e-01,\n",
       "         -1.24578327e-01, -4.04102027e-01, -5.07451892e-01,  5.48313439e-01,\n",
       "         -3.14690828e-01, -1.80744618e-01,  2.89481074e-01,  5.75180650e-02,\n",
       "         -1.80966973e-01,  9.15100127e-02,  4.65520382e-01,  7.72563145e-02,\n",
       "          2.23801762e-01, -1.68021813e-01,  1.34750247e-01,  2.97952354e-01,\n",
       "          2.26987556e-01,  3.05611968e-01,  8.25501680e-02,  1.27266750e-01,\n",
       "          4.45462048e-01,  4.75219935e-01,  2.56612748e-02, -4.94095802e-01,\n",
       "          6.80847049e-01,  6.35498241e-02,  2.54887581e-01, -1.44208744e-01,\n",
       "         -5.48628032e-01,  3.29708159e-02,  4.15678322e-02, -2.43740305e-02,\n",
       "         -2.19443083e-01, -1.42820865e-01, -2.50694513e-01, -2.07656175e-01,\n",
       "         -1.79200202e-01,  3.50941271e-01,  6.33472502e-01,  3.80550802e-01,\n",
       "         -2.89177060e-01,  2.02112541e-01, -4.48559940e-01,  2.72922277e-01,\n",
       "          2.24376470e-01, -2.83806086e-01, -4.37650621e-01, -9.45881248e-01,\n",
       "          1.22266009e-01,  4.01373804e-02,  3.55452597e-01,  2.14725360e-01,\n",
       "         -3.82868111e-01, -3.58605534e-01,  1.33403212e-01,  3.17368060e-02,\n",
       "          8.55790824e-02,  8.59866962e-02,  9.54709649e-02, -3.47019315e-01,\n",
       "         -7.17681199e-02,  2.91243494e-01,  2.65088677e-01, -9.42255110e-02,\n",
       "         -1.77516475e-01,  2.28757486e-01,  9.07462239e-01, -1.03128985e-01,\n",
       "          7.33331919e-01,  2.64944196e-01, -1.47793457e-01,  3.05288285e-01,\n",
       "         -2.62914717e-01,  1.97676837e-01,  6.06522709e-02, -1.16444036e-01,\n",
       "          7.31720030e-03,  1.67819262e-01,  9.79750305e-02,  1.47580564e-01,\n",
       "         -4.00337100e-01,  4.21648234e-01, -8.30130056e-02, -6.39808536e-01,\n",
       "         -1.41640037e-01,  4.65196744e-02,  7.18399510e-02, -4.38913286e-01,\n",
       "          2.07776040e-01,  4.70564365e-02, -8.90249163e-02, -4.53151077e-01,\n",
       "         -2.14879364e-01,  2.44945109e-01,  3.16962332e-01, -3.41699898e-01,\n",
       "         -1.91379398e-01, -2.09521502e-02,  2.30608881e-01,  3.33673298e-01,\n",
       "          2.77272940e-01, -2.96297669e-01,  1.22105226e-01, -2.16432393e-01,\n",
       "          5.48318982e-01,  2.72968560e-01,  1.73096061e-01,  1.80758446e-01,\n",
       "         -3.40643704e-01,  2.62541354e-01,  1.24807537e-01, -7.05129027e-01,\n",
       "         -1.10301673e-02, -1.81341633e-01, -1.78187162e-01,  1.32018521e-01,\n",
       "         -4.31974642e-02,  3.50803137e-03,  1.59508467e-01,  9.21479613e-02,\n",
       "          4.54916626e-01,  2.72805393e-01, -5.77594995e-01, -2.87324101e-01,\n",
       "          1.66138545e-01,  8.66497457e-02,  9.02152061e-03, -3.78496647e-01,\n",
       "         -3.07205200e-01,  1.98497474e-02, -2.17410728e-01, -3.29560116e-02,\n",
       "         -9.36597586e-03,  1.02077954e-01, -5.64144433e-01,  2.59325027e-01,\n",
       "         -1.29755080e-01,  1.67370975e-01,  3.65311682e-01,  1.91536024e-02,\n",
       "         -1.80281207e-01, -1.50442317e-01,  3.04976583e-01,  3.71467024e-02,\n",
       "          1.42817795e-02,  1.84084043e-01,  2.46860459e-01,  1.05640717e-01,\n",
       "          4.84380350e-02, -3.53350788e-02, -4.98285890e-02,  2.02643991e-01,\n",
       "         -1.73173368e-01, -3.63763750e-01, -2.20462590e-01,  3.16181660e-01,\n",
       "          6.26122355e-02,  7.24825263e-02, -1.47105187e-01,  3.08875114e-01,\n",
       "          9.42751944e-01,  1.98151767e-02, -1.21705681e-02, -2.04986215e-01,\n",
       "          2.55928874e-01, -9.34748650e-02, -1.57367602e-01, -9.39194918e-01,\n",
       "          7.99043655e-01,  7.17636049e-01, -3.75675023e-01,  5.69819212e-01,\n",
       "         -1.33306980e-02,  5.30459821e-01, -5.34143150e-01,  2.46586308e-01,\n",
       "         -1.07142776e-01,  3.60272229e-02, -2.97878355e-01, -4.83343512e-01,\n",
       "          6.04178965e-01, -5.00948966e-01,  3.49492043e-01,  2.63356715e-02,\n",
       "          9.19317901e-02,  4.02334750e-01,  1.58838168e-01, -6.79962754e-01,\n",
       "         -2.58434951e-01, -4.40313101e-01,  3.03082943e-01,  3.24987531e-01,\n",
       "          5.39690316e-01,  5.20520747e-01,  4.50526476e-01,  4.25643712e-01,\n",
       "         -3.66918474e-01,  3.89405042e-01, -1.27459919e+00,  1.07020557e-01,\n",
       "         -2.60990113e-01, -1.43924713e-01,  7.54843205e-02,  9.26971912e-01,\n",
       "          3.27435076e-01, -1.17758083e+00,  1.98659807e-01, -2.22036242e-02,\n",
       "          7.09706426e-01,  2.66087741e-01,  1.21972799e-01,  3.83028448e-01,\n",
       "         -7.28927970e-01,  2.53533423e-01, -4.85364079e-01, -2.49552399e-01,\n",
       "         -6.45130798e-02, -7.29702055e-01,  4.32396650e-01,  2.20177352e-01,\n",
       "          2.00846523e-01, -9.86093953e-02, -1.90977231e-01,  2.79123366e-01,\n",
       "          1.66312718e+00,  4.78211671e-01, -2.51015574e-02,  2.72021919e-01,\n",
       "          7.38142252e-01, -1.70818880e-01,  8.71480852e-02,  5.43941200e-01,\n",
       "          1.69077545e-01, -3.87216568e-01, -2.42074981e-01,  2.69218683e-01,\n",
       "          3.44689578e-01, -8.90392721e-01, -7.69254029e-01, -3.58835727e-01,\n",
       "          5.44936657e-01, -5.26413918e-01, -7.02109486e-02, -9.80203599e-02,\n",
       "          1.44377463e-02,  2.74509192e-01, -2.26177007e-01, -4.58218694e-01,\n",
       "         -1.67407230e-01,  9.71812904e-02, -4.52374101e-01,  2.12075025e-01,\n",
       "          3.00378621e-01, -4.85781908e-01, -8.94448385e-02, -3.76136065e-01,\n",
       "          6.35547996e-01, -5.96616030e-01,  4.56893116e-01,  8.58043283e-02,\n",
       "         -4.65728343e-01,  2.77830362e-02,  3.81695107e-02, -2.30244398e-01,\n",
       "          2.88146555e-01,  4.18678015e-01,  2.95978993e-01, -3.73036265e-01,\n",
       "          2.28022814e-01,  3.33541095e-01, -1.05592966e-01, -3.15682322e-01,\n",
       "         -1.58445865e-01, -1.87164456e-01, -2.52392352e-01, -2.95361459e-01,\n",
       "          8.43314767e-01,  1.14070855e-01, -2.23936290e-02,  1.09956905e-01,\n",
       "         -3.88728410e-01,  1.39827192e-01,  2.20896304e-03, -1.90839782e-01,\n",
       "         -9.09138024e-01,  1.57145366e-01, -1.39061734e-02, -2.81434655e-02,\n",
       "          1.31378785e-01,  1.93338543e-02, -3.97078335e-01,  4.37846482e-02,\n",
       "          5.70612788e-01, -3.71424168e-01,  1.27987742e-01, -1.53837472e-01,\n",
       "         -1.62056625e-01, -2.61609107e-02, -9.74950373e-01, -2.85339534e-01,\n",
       "          1.63912773e-06, -5.19999683e-01, -1.39436722e-01, -1.61674783e-01,\n",
       "          2.82034755e-01,  5.65709114e-01,  1.78672597e-01,  2.84626663e-01,\n",
       "         -1.29202157e-02, -5.35536289e-01,  6.67075515e-02,  1.26035556e-01,\n",
       "          4.77381825e-01,  4.13615763e-01, -8.82375896e-01,  2.16037303e-01,\n",
       "         -7.70101696e-03, -1.17288440e-01,  3.86770785e-01,  3.40055168e-01,\n",
       "         -3.02812994e-01, -2.90828407e-01, -4.41879064e-01, -3.02490383e-01,\n",
       "          1.14623390e-01,  5.78144714e-02, -5.26804924e-01, -1.41756326e-01,\n",
       "          2.44007260e-03,  6.49953783e-02, -2.29362860e-01, -5.48199415e-01,\n",
       "         -7.99068511e-01, -3.52483392e-02,  4.28465232e-02, -5.25768161e-01,\n",
       "          1.63442791e-01, -2.11263210e-01, -6.78406954e-02, -2.00106874e-01,\n",
       "          4.71600831e-01, -4.66120839e-01,  2.91595399e-01, -5.46463057e-02,\n",
       "         -5.07597089e-01,  6.30303979e-01, -7.32594490e-01,  1.00498989e-01,\n",
       "         -7.07668304e-01, -8.52220505e-02, -5.60936481e-02, -1.76814944e-03,\n",
       "          3.38251948e-01, -1.68113366e-01, -1.64996088e-01,  1.30709276e-01,\n",
       "         -9.02270436e-01,  1.71258971e-01, -5.64924479e-02, -2.03938767e-01],\n",
       "        dtype=float32),\n",
       "  'prediction': 0}]"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# y_pred (batch inference).\n",
    "pred_ds = test_ds.map_batches(\n",
    "    predictor,\n",
    "    fn_kwargs={\"device\": \"cuda\"},\n",
    "    concurrency=4,\n",
    "    batch_size=64,\n",
    "    num_gpus=1,\n",
    "    accelerator_type=\"L4\",\n",
    ")\n",
    "pred_ds.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def batch_metric(batch):\n",
    "    labels = batch[\"label\"]\n",
    "    preds = batch[\"prediction\"]\n",
    "    mcm = multilabel_confusion_matrix(labels, preds)\n",
    "    tn, fp, fn, tp = [], [], [], []\n",
    "    for i in range(mcm.shape[0]):\n",
    "        tn.append(mcm[i, 0, 0])  # True negatives\n",
    "        fp.append(mcm[i, 0, 1])  # False positives\n",
    "        fn.append(mcm[i, 1, 0])  # False negatives\n",
    "        tp.append(mcm[i, 1, 1])  # True positives\n",
    "    return {\"TN\": tn, \"FP\": fp, \"FN\": fn, \"TP\": tp}\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-05-29 17:30:31,352\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-29_17-07-53_816345_69024/logs/ray-data\n",
      "2025-05-29 17:30:31,353\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ListFiles] -> TaskPoolMapOperator[ReadFiles] -> TaskPoolMapOperator[Map(add_class)->Map(Preprocessor.convert_to_label)] -> ActorPoolMapOperator[MapBatches(EmbeddingGenerator)] -> TaskPoolMapOperator[MapBatches(drop_columns)] -> TaskPoolMapOperator[MapBatches(TorchPredictor)] -> TaskPoolMapOperator[MapBatches(batch_metric)] -> AllToAllOperator[Aggregate] -> LimitOperator[limit=1]\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "c1da670c8cb44291ac36977e111f1fe5",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(_MapWorker pid=24021, ip=10.0.153.142)\u001b[0m Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "b9039ac18ba64a5793c5cbd367858a07",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ListFiles 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "461122f7e1fb4956b0dfb2b0046f5d8c",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- ReadFiles 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "7464589fe42a4e49b6eed34eabed6943",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- Map(add_class)->Map(Preprocessor.convert_to_label) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "346fde2be402494a86044e2b03055102",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(EmbeddingGenerator) 4: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "46bf49fb058f4794b3e4271b27dea96d",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(drop_columns) 5: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "acb8ce0475f541e7ad55bc02f13ba63c",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(TorchPredictor) 6: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "df7edefe968e43e18e55b3be79dd95a8",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- MapBatches(batch_metric) 7: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "5ed05abb69304844af009a78ee551c50",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- Aggregate 8: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "ea3ead4416f142738ceaf2131644cf30",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Sort Sample 9:   0%|          | 0.00/1.00 [00:00<?, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "636d480c732a44a2a88cb995d08a6e81",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Shuffle Map 10:   0%|          | 0.00/1.00 [00:00<?, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "d236268ac1004cf8af090a35edc9ffb6",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Shuffle Reduce 11:   0%|          | 0.00/1.00 [00:00<?, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "2f72f8cb23f94519b577300d0e60e769",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "- limit=1 12: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(MapBatches(TorchPredictor) pid=24063, ip=10.0.153.142)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\n",
      "\u001b[36m(_MapWorker pid=24015, ip=10.0.153.142)\u001b[0m Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(TorchPredictor) pid=31408, ip=10.0.153.142)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\u001b[32m [repeated 4x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(TorchPredictor) pid=31754, ip=10.0.153.142)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\u001b[32m [repeated 4x across cluster]\u001b[0m\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[36m(autoscaler +3m34s)\u001b[0m [autoscaler] Cluster upscaled to {160 CPU, 13 GPU}.\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-05-29 17:31:42,303\tWARNING issue_detector_manager.py:39 -- A task for operator MapBatches(TorchPredictor) with task index 12 has been hanging for >30.085022853999817s.\n",
      "2025-05-29 17:31:42,304\tWARNING issue_detector_manager.py:39 -- A task for operator MapBatches(TorchPredictor) with task index 13 has been hanging for >30.08502354700022s.\n",
      "2025-05-29 17:31:42,305\tWARNING issue_detector_manager.py:39 -- A task for operator MapBatches(TorchPredictor) with task index 14 has been hanging for >30.085023250000177s.\n",
      "2025-05-29 17:31:42,305\tWARNING issue_detector_manager.py:39 -- A task for operator MapBatches(TorchPredictor) with task index 15 has been hanging for >30.085023025999817s.\n",
      "2025-05-29 17:31:42,306\tWARNING issue_detector_manager.py:41 -- To disable issue detection, run DataContext.get_current().issue_detectors_config.detectors = [].\n",
      "\u001b[36m(MapBatches(TorchPredictor) pid=3391, ip=10.0.179.42)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\u001b[32m [repeated 4x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(TorchPredictor) pid=32212, ip=10.0.153.142)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\u001b[32m [repeated 5x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(TorchPredictor) pid=32466, ip=10.0.153.142)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\u001b[32m [repeated 6x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(TorchPredictor) pid=4236, ip=10.0.179.42)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(TorchPredictor) pid=4496, ip=10.0.179.42)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\u001b[32m [repeated 4x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(TorchPredictor) pid=4905, ip=10.0.179.42)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\u001b[32m [repeated 5x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(TorchPredictor) pid=5108, ip=10.0.179.42)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\u001b[32m [repeated 5x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(TorchPredictor) pid=5521, ip=10.0.179.42)\u001b[0m /tmp/ipykernel_108978/3214280880.py:6: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:203.)\u001b[32m [repeated 5x across cluster]\u001b[0m\n"
     ]
    }
   ],
   "source": [
    "# Aggregated metrics after processing all batches.\n",
    "metrics_ds = pred_ds.map_batches(batch_metric)\n",
    "aggregate_metrics = metrics_ds.sum([\"TN\", \"FP\", \"FN\", \"TP\"])\n",
    "\n",
    "# Aggregate the confusion matrix components across all batches.\n",
    "tn = aggregate_metrics[\"sum(TN)\"]\n",
    "fp = aggregate_metrics[\"sum(FP)\"]\n",
    "fn = aggregate_metrics[\"sum(FN)\"]\n",
    "tp = aggregate_metrics[\"sum(TP)\"]\n",
    "\n",
    "# Calculate metrics.\n",
    "precision = tp / (tp + fp) if (tp + fp) > 0 else 0\n",
    "recall = tp / (tp + fn) if (tp + fn) > 0 else 0\n",
    "f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0\n",
    "accuracy = (tp + tn) / (tp + tn + fp + fn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Precision: 0.84\n",
      "Recall: 0.84\n",
      "F1: 0.84\n",
      "Accuracy: 0.98\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[36m(autoscaler +7m44s)\u001b[0m [autoscaler] Downscaling node i-08777c6136bc406a6 (node IP: 10.0.179.42) due to node idle termination.\n",
      "\u001b[36m(autoscaler +7m44s)\u001b[0m [autoscaler] Cluster resized to {112 CPU, 9 GPU}.\n",
      "\u001b[36m(autoscaler +34m54s)\u001b[0m [autoscaler] [4xL4:48CPU-192GB] Attempting to add 1 node(s) to the cluster (increasing from 1 to 2).\n",
      "\u001b[36m(autoscaler +34m59s)\u001b[0m [autoscaler] [4xL4:48CPU-192GB] Launched 1 instances.\n",
      "\u001b[36m(autoscaler +35m44s)\u001b[0m [autoscaler] Cluster upscaled to {160 CPU, 13 GPU}.\n",
      "\u001b[36m(autoscaler +39m39s)\u001b[0m [autoscaler] Downscaling node i-0166a13ff9c07f0b4 (node IP: 10.0.158.79) due to node idle termination.\n",
      "\u001b[36m(autoscaler +39m39s)\u001b[0m [autoscaler] Cluster resized to {112 CPU, 9 GPU}.\n"
     ]
    }
   ],
   "source": [
    "print(f\"Precision: {precision:.2f}\")\n",
    "print(f\"Recall: {recall:.2f}\")\n",
    "print(f\"F1: {f1:.2f}\")\n",
    "print(f\"Accuracy: {accuracy:.2f}\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "base",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
